package com.lagou.flink.work.p4;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

/**
 * 功能描述：
 *
 * @author : zhangyong
 * @since : 2021/5/18
 */
public class WaterMarkerTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> data = env.socketTextStream("teacher2", 7777);
        SingleOutputStreamOperator<Tuple2<String, Long>> maped = data.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] split = s.split(",");
                return new Tuple2<>(split[0], Long.valueOf(split[1]));
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> wartermarks = maped.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
            Long currentMaxTimestamp = 0l;
            final Long maxOutOfOrderness = 10000l;
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                System.out.println("key:" + element.f0
                                + "...eventtime:[" + element.f1 + "|" + sdf.format(element.f1));
                System.out.println("currentMaxTimestamp" + currentMaxTimestamp + "..." +
                        sdf.format(currentMaxTimestamp));
                System.out.println("watermark:" + getCurrentWatermark().getTimestamp() +
                        "..." + sdf.format(getCurrentWatermark().getTimestamp()));
                return timestamp;
            }
        });
        SingleOutputStreamOperator<String> res =
                wartermarks.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
           @Override
           public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>>
                   input, Collector<String> out) throws Exception {
               String key = tuple.toString();
               ArrayList<Long> list = new ArrayList<>();
               Iterator<Tuple2<String, Long>> it = input.iterator();
               while (it.hasNext()) {
                   Tuple2<String, Long> next = it.next();
                   list.add(next.f1);
               }
               Collections.sort(list);
               SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
               String result = key + "," + list.size() + "," + sdf.format(list.get(0)) +
                       "," + sdf.format(list.get(list.size() - 1)) + "," + sdf.format(window.getStart()) + "," +
                       sdf.format(window.getEnd());
               out.collect(result);
           }
       });
        res.print();
        env.execute();
    }
}
